In [1]:
import os
import sys
import ruamel.yaml as yaml
from ddf_utils.chef.api import Chef
In [ ]:
In [2]:
# at the beginning, initial the chef
chef = Chef()
In [3]:
# adding configs and metadata(to the info section)
(chef.add_config(ddf_dir=os.path.expanduser('~/src/work/Gapminder/datasets'))
.add_metadata(id='test_dataset',
base=['ddf--transpint--corrupton']))
Out[3]:
In [4]:
# adding ingredient, parameters are just like in recipe
i = '''
id: cpi-datapoints
dataset: ddf--transpint--corrupton
key: country, year
value: "*"
'''
d = yaml.round_trip_load(i)
chef.add_ingredient(**d)
Out[4]:
In [5]:
# add an other ingredient
chef.add_ingredient(id='bp-datapoints', dataset='ddf--bp--energy', key='geo, year', value='*')
Out[5]:
In [ ]:
In [6]:
# add procedures, parameters are same as in recipe
chef.add_procedure(collection='datapoints', # which part of cooking section the procedure is in
procedure='translate_header',
ingredients=['bp-datapoints'],
result='bp-datapoints-translate',
options={'dictionary': {'geo': 'country'}})
Out[6]:
In [7]:
# add an other procedure
chef.add_procedure(collection='datapoints',
procedure='merge',
ingredients=['bp-datapoints-translate', 'cpi-datapoints'],
result='res')
Out[7]:
In [8]:
# you can print the recipe in yaml format
a = chef.to_recipe()
In [9]:
# also in graph
chef.to_graph()
Out[9]:
In [10]:
# to run the recipe and get the output:
res = chef.run()
In [11]:
res
Out[11]:
In [12]:
res[0].get_data().keys()
Out[12]:
In [13]:
res[0].get_data()['biofuels_production_kboed'].sample(5)
Out[13]:
In [14]:
res[0].get_data()['cpi'].sample(5)
Out[14]:
In [ ]:
In [ ]:
In [15]:
# we can also create a Chef instance from existing recipes
recipe_file = '../tests/recipes_pass/test_flatten.yml'
print(open(os.path.abspath(recipe_file)).read())
In [16]:
chef = Chef.from_recipe(os.path.expandvars(recipe_file),
ddf_dir=os.path.expanduser('/Users/semio/src/work/Gapminder/libs/ddf_utils/tests/datasets'))
In [17]:
chef.to_recipe()
In [18]:
res = chef.run()
In [19]:
res[0].get_data().keys()
Out[19]:
In [20]:
res[0].get_data()['agriculture_percentage_f']
Out[20]:
In [ ]:
In [ ]:
In [21]:
from ddf_utils.chef.helpers import gen_sym
In [22]:
chef = Chef()
In [23]:
chef.add_config(ddf_dir=os.path.expanduser('~/src/work/Gapminder/datasets'))
Out[23]:
In [24]:
chef.add_ingredient(id='population_by_age_dps',
dataset='ddf--unpop--wpp_population_semio',
key='country_code,year,age',
value=['population'])
Out[24]:
In [25]:
collection = 'datapoints'
groups = [list(map(str, range(0, 5))),
list(map(str, range(5, 10))),
list(map(str, range(10, 20)))
]
names = ['population_0_4', 'population_5_9', 'population_10_19']
ingredients_0 = ['population_by_age_dps']
to_merge = []
for g, n in zip(groups, names):
procedure = 'filter_row'
options = {
'filters': {
'population': {
'age': g}}}
result = gen_sym('filter_row', ingredients_0, options)
chef.add_procedure(collection=collection,
procedure=procedure,
ingredients=ingredients_0,
options=options,
result=result
)
ingredients = [result]
procedure = 'groupby'
options = {
'groupby': ['country_code', 'year'],
'aggregate': {'population': 'sum'}
}
result = gen_sym(procedure, ingredients, options)
chef.add_procedure(collection=collection,
procedure=procedure,
ingredients=ingredients,
options=options,
result=result
)
ingredients = [result]
procedure = 'translate_header'
options = {
'dictionary': {
'population': n
}
}
result = gen_sym(procedure, ingredients, options)
chef.add_procedure(collection=collection,
procedure=procedure,
ingredients=ingredients,
options=options,
result=result
)
to_merge.append(result)
In [26]:
chef.add_procedure(collection=collection,
procedure='merge',
ingredients=to_merge,
result='result'
)
Out[26]:
In [27]:
chef.to_recipe()
In [ ]:
In [28]:
chef.to_graph()
Out[28]:
In [ ]:
In [29]:
res = chef.run()
In [30]:
res
Out[30]:
In [31]:
res[0].get_data().keys()
Out[31]:
In [ ]:
In [ ]:
In [ ]:
In [32]:
chef = Chef()
In [33]:
i = '''
id: cpi-datapoints
dataset: ddf--transpint--corrupton
key: country, year
value: "*"
snieot: 'soneot'
'''
d = yaml.round_trip_load(i)
chef.add_ingredient(**d)
Out[33]:
In [34]:
i = '''
id: cpi-datapoints_
dataset: ddf--transpint--corrupton
oh_my_interesting_key: country, year # error
value: "*"
'''
d = yaml.round_trip_load(i)
try:
chef.add_ingredient(**d)
except KeyError as e:
print(str(e))
In [35]:
i = '''
procedure: my_new_procedure
ingredients:
- testing
options:
opt: val
result: result
'''
d = yaml.round_trip_load(i)
chef.add_procedure('datapoints', **d)
Out[35]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [36]:
from ddf_utils.chef.ingredient import Ingredient, ProcedureResult
In [49]:
def multiply_1000(chef, ingredients, result, **options):
ingredients = [chef.dag.get_node(x) for x in ingredients]
ingredient = ingredients[0].evaluate()
new_data = dict()
for k, df in ingredient.get_data().items():
df_ = df.copy()
df_[k] = df_[k] * 1000
new_data[k] = df_
return ProcedureResult(chef, result, ingredient.key, new_data)
In [50]:
chef = Chef()
chef.add_config(ddf_dir=os.path.expanduser('~/src/work/Gapminder/datasets'))
Out[50]:
In [51]:
i = '''
id: cpi-datapoints
dataset: ddf--transpint--corrupton
key: country, year
value: "*"
'''
d = yaml.round_trip_load(i)
chef.add_ingredient(**d)
Out[51]:
In [52]:
chef.register_procedure(multiply_1000)
In [53]:
chef.add_procedure(collection='datapoints',
procedure='multiply_1000',
result='res',
ingredients=['cpi-datapoints']
)
Out[53]:
In [54]:
res = chef.run()
In [55]:
res[0].get_data()['cpi'].head(5)
Out[55]:
In [56]:
chef.ingredients
Out[56]:
In [57]:
chef.ingredients[0].get_data()['cpi'].head() # the original
Out[57]:
In [58]:
chef.to_graph()
Out[58]:
In [ ]: